/**
 * FileName: ProcessChain
 * Author:   SAMSUNG-PC 孙中军
 * Date:     2018/3/1 10:40
 * Description:过滤链
 */
package cn.com.bonc.process.chain;

import cn.com.bonc.process.Process;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import java.util.ArrayList;
import java.util.List;

public class ProcessChain {

   private List<Process> processList;
   private Dataset<Row> rowDataset;


   private ProcessChain(Dataset<Row> rowDataset) {
      this.rowDataset=rowDataset;
      processList = new ArrayList<>();
   }

   /**
    * 设置过滤链被过滤的源数据
    * @param rowDataset
    * @return
    */
   public static ProcessChain setSourceData(Dataset<Row> rowDataset){
      return new ProcessChain(rowDataset);
   }

   /**
    * 为过滤器链添加过滤器
    * @param process
    * @return
    */
   public ProcessChain addProcess(Process process){
      if (processList !=null){
         processList.add(process);
      }
      return this;
   }

   /**
    * 执行链中的每个过滤，并得到最终结果
    * @return
    */
   public Dataset<Row> execute(){
      if (processList !=null){
         for (Process process : processList) {
            rowDataset= process.processing(rowDataset);
         }
         return rowDataset;
      }
      return null;
   }
}